{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "10fd0b6d",
   "metadata": {},
   "source": [
    "(data-ml-ingest-example)=\n",
    "\n",
    "# Example: Large-scale ML Ingest\n",
    "\n",
    "In this example, you will learn how to build, deploy and scale up a machine\n",
    "learning shuffle ingestion pipeline using\n",
    "[Ray Dataset](https://docs.ray.io/en/latest/data/dataset.html) and\n",
    "[Dataset Pipelines](https://docs.ray.io/en/latest/data/dataset-pipeline.html).\n",
    "\n",
    "In particular, we will show you:\n",
    "\n",
    "- How to build a shuffle ingestion pipeline that loads, shuffles and feeds data\n",
    "  into distributed trainers in a few lines of code;\n",
    "- How to scale the pipeline from ingesting 100MiB data to\n",
    "  500GiB data.\n",
    "\n",
    "```{image} ../images/dataset-repeat-2.svg\n",
    ":align: center\n",
    "```\n",
    "\n",
    "## Python Setup\n",
    "\n",
    "First, we'll import all of the libraries we'll be using. This step also helps us\n",
    "verify that the environment is configured correctly. If any of the imports\n",
    "are missing, an exception will be raised."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d0798693",
   "metadata": {},
   "outputs": [],
   "source": [
    "import argparse\n",
    "import tempfile\n",
    "import time\n",
    "from typing import List\n",
    "\n",
    "import pandas\n",
    "import pyarrow\n",
    "\n",
    "import ray\n",
    "from ray.data.dataset_pipeline import DatasetPipeline\n",
    "from ray.data.datasource.datasource import RandomIntRowDatasource"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "22868029",
   "metadata": {},
   "source": [
    "## Build shuffle ingestion pipeline\n",
    "\n",
    "A typical machine learning ingestion pipeline consists of the following 4\n",
    "steps:\n",
    "\n",
    "1. Load the training data from external storage;\n",
    "2. Iterate over the data for multiple epochs;\n",
    "3. In each epoch, applying global shuffle to decorrelate the data;\n",
    "4. In each epoch, split the shuffled data into shards, and feed shards to\n",
    "   distributed trainers;\n",
    "\n",
    "Let’s see how we implement such pipeline using Ray Dataset:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4b9b0a78",
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_shuffle_pipeline(\n",
    "    training_data_dir: str, num_epochs: int, num_shards: int\n",
    ") -> List[DatasetPipeline]:\n",
    "\n",
    "    return (\n",
    "        ray.data.read_parquet(training_data_dir)\n",
    "        .repeat(num_epochs)\n",
    "        .random_shuffle_each_window()\n",
    "        .split(num_shards, equal=True)\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fb87ee4c",
   "metadata": {},
   "source": [
    "We’ve now defined a ``create_shuffle_pipeline`` function that creates an\n",
    "ingestion pipeline.\n",
    "It reads ``training_data_dir``, iterates for ``num_epochs`` times,\n",
    "where in each epoch it\n",
    "shuffles and splits the training data into ``num_shards``.\n",
    "\n",
    "## Feed the pipeline into trainers\n",
    "\n",
    "Let’s also implement a ``TrainingWorker`` which consumes the shuffled data\n",
    "from each shard.\n",
    "\n",
    "For simplicity, we will define a [Ray Actor](https://docs.ray.io/en/latest/actors.html) that emulates training workers.\n",
    "Specifically,\n",
    "\n",
    "1. It takes one shard of the shuffle pipeline for training;\n",
    "2. It iterates over the shard to get a training dataset per epoch;\n",
    "3. It then consumes the dataset by batches;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "42c68bd8",
   "metadata": {},
   "outputs": [],
   "source": [
    "@ray.remote\n",
    "class TrainingWorker:\n",
    "    def __init__(self, rank: int, shard: DatasetPipeline):\n",
    "        self.rank = rank\n",
    "        self.shard = shard\n",
    "\n",
    "    def train(self):\n",
    "        for epoch, training_dataset in enumerate(self.shard.iter_epochs()):\n",
    "            # Following code emulates epoch based SGD training.\n",
    "            print(f\"Training... worker: {self.rank}, epoch: {epoch}\")\n",
    "            for i, batch in enumerate(training_dataset.iter_batches()):\n",
    "                # TODO: replace the code for real training.\n",
    "                pass"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "72e9a762",
   "metadata": {},
   "source": [
    "## Let's run it\n",
    "\n",
    "Now let’s run the data pipeline end-to-end:\n",
    "\n",
    "First, let's parse some arguments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2f243148",
   "metadata": {},
   "outputs": [],
   "source": [
    "parser = argparse.ArgumentParser()\n",
    "parser.add_argument(\n",
    "    \"--large-scale-test\",\n",
    "    action=\"store_true\",\n",
    "    help=\"Run large scale test (500GiB of data).\",\n",
    ")\n",
    "\n",
    "args, _ = parser.parse_known_args()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "121aa9da",
   "metadata": {},
   "source": [
    "After that, let's generate 100MiB of Parquet files,\n",
    "create the shuffle pipeline by reading those generated Parquet files,\n",
    "and use training workers to consume the pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "f4bc5cbc",
   "metadata": {},
   "outputs": [],
   "source": [
    "if not args.large_scale_test:\n",
    "\n",
    "    NUM_TRAINING_WORKERS = 4\n",
    "    NUM_EPOCHS = 5\n",
    "    NUM_COLUMNS = 10\n",
    "    SIZE_100MiB = 100 * 1024 * 1024\n",
    "\n",
    "    # create a local ray cluster.\n",
    "    ray.init()\n",
    "\n",
    "    def generate_example_files(size_bytes: int) -> str:\n",
    "        tmpdir = tempfile.mkdtemp()\n",
    "        ray.data.read_datasource(\n",
    "            RandomIntRowDatasource(),\n",
    "            n=size_bytes // 8 // NUM_COLUMNS,\n",
    "            num_columns=NUM_COLUMNS,\n",
    "        ).write_parquet(tmpdir)\n",
    "        return tmpdir\n",
    "\n",
    "    example_files_dir = generate_example_files(SIZE_100MiB)\n",
    "\n",
    "    splits = create_shuffle_pipeline(\n",
    "        example_files_dir, NUM_EPOCHS, NUM_TRAINING_WORKERS\n",
    "    )\n",
    "\n",
    "    training_workers = [\n",
    "        TrainingWorker.remote(rank, shard) for rank, shard in enumerate(splits)\n",
    "    ]\n",
    "\n",
    "    # Let's run the e2e pipeline\n",
    "    start = time.time()\n",
    "    ray.get([worker.train.remote() for worker in training_workers])\n",
    "    print(f\"total ingestion time: {int(time.time() - start)}s\")\n",
    "\n",
    "    # -> Write Progress: 100%|████████████████████| 201/201 [00:00<00:00, 228.67it/s]\n",
    "    # -> Stage 0:   0%|          | 0/5 [00:00<?, ?it/s]\n",
    "    # -> Stage 0:  40%|████      | 2/5 [00:11<00:17,  5.75s/it]\n",
    "    # -> Stage 0:  60%|██████    | 3/5 [00:23<00:16,  8.15s/it]\n",
    "    # -> ...\n",
    "    # -> (TrainingWorker pid=1651600) Training... worker: 2, epoch: 0\n",
    "    # -> Stage 0:  80%|████████  | 4/5 [00:35<00:09,  9.59s/it]\n",
    "    # -> ...\n",
    "    # -> (TrainingWorker pid=1651599) Training... worker: 0, epoch: 1\n",
    "    # -> Stage 0: 100%|██████████| 5/5 [00:46<00:00, 10.34s/it]\n",
    "    # -> ...\n",
    "    # -> (TrainingWorker pid=1651387) Training... worker: 3, epoch: 4\n",
    "    # -> total ingestion time: 61s"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b089a98c",
   "metadata": {},
   "source": [
    "## Scale the shuffle ingestion pipeline\n",
    "\n",
    "Scaling the shuffle ingestion pipeline is simple. With Ray, we can linearly\n",
    "scale the pipeline from ingesting 100MiB of data to 500GiB of data by adding\n",
    "more machines.\n",
    "\n",
    "To ingest 500GiB of data, we'll set up a Ray Cluster.\n",
    "The provided :download:`big_data_ingestion.yaml <../big_data_ingestion.yaml>`\n",
    "cluster config can be used to set up an AWS cluster with 70 CPU nodes and\n",
    "16 GPU nodes. Using following command to bring up the Ray cluster.\n",
    "\n",
    "```bash\n",
    "pip install ray boto3\n",
    "ray up big_data_ingestion.yaml\n",
    "```\n",
    "\n",
    "\n",
    "After the cluster is started, let's implement our large scale ingestion test:\n",
    "\n",
    "First, since we are runing on a cluster, let's create the pipeline from\n",
    "RandomIntRowDatasource directly. In this way we don't need to set up S3 for storing\n",
    "generated data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9590e805",
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_large_shuffle_pipeline(\n",
    "    data_size_bytes: int, num_epochs: int, num_columns: int, num_shards: int\n",
    ") -> List[DatasetPipeline]:\n",
    "    # _spread_resource_prefix is used to ensure tasks are evenly spread to all\n",
    "    # CPU nodes.\n",
    "    return (\n",
    "        ray.data.read_datasource(\n",
    "            RandomIntRowDatasource(),\n",
    "            n=data_size_bytes // 8 // num_columns,\n",
    "            num_columns=num_columns,\n",
    "            _spread_resource_prefix=\"node:\",\n",
    "        )\n",
    "        .repeat(num_epochs)\n",
    "        .random_shuffle_each_window(_spread_resource_prefix=\"node:\")\n",
    "        .split(num_shards, equal=True)\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ad9bf581",
   "metadata": {},
   "source": [
    "Now, it's time to implement the 500GiB shuffle ingestion pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "da8e2f95",
   "metadata": {},
   "outputs": [],
   "source": [
    "if args.large_scale_test:\n",
    "    NUM_TRAINING_WORKERS = 16\n",
    "    NUM_EPOCHS = 5\n",
    "    NUM_COLUMNS = 10\n",
    "    GiB = 1024 * 1024 * 1024\n",
    "    SIZE_500GiB = 500 * GiB\n",
    "    TOTAL_NUM_NODES = 70 + 16 + 1\n",
    "\n",
    "    # use the AWS cluster we just set up.\n",
    "    ray.init(address=\"auto\")\n",
    "\n",
    "    # waiting for cluster nodes to come up.\n",
    "    while len(ray.nodes()) < TOTAL_NUM_NODES:\n",
    "        print(f\"waiting for nodes to start up: {len(ray.nodes())}/{TOTAL_NUM_NODES}\")\n",
    "        time.sleep(5)\n",
    "\n",
    "    splits = create_large_shuffle_pipeline(\n",
    "        SIZE_500GiB, NUM_EPOCHS, NUM_COLUMNS, NUM_TRAINING_WORKERS\n",
    "    )\n",
    "\n",
    "    # Note we set num_gpus=1 for workers so that\n",
    "    # the workers will only run on GPU nodes.\n",
    "    training_workers = [\n",
    "        TrainingWorker.options(num_gpus=1).remote(rank, shard)\n",
    "        for rank, shard in enumerate(splits)\n",
    "    ]\n",
    "\n",
    "    start = time.time()\n",
    "\n",
    "    # Let's run the large scale test.\n",
    "    ray.get([worker.train.remote() for worker in training_workers])\n",
    "    print(f\"total ingestion time: {int(time.time() - start)}s\")\n",
    "    throughput = SIZE_500GiB * NUM_EPOCHS / (time.time() - start) / GiB\n",
    "    print(\"throughput: {0:0.2f}GiB/s\".format(throughput))"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "89f1c48e",
   "metadata": {},
   "source": [
    "Finally, let's run our pipeline on the cluster we just started:\n",
    "\n",
    "```bash\n",
    "$ ray submit ./big_data_ingestion.yaml ./big_data_ingestion.py --large-scale-test\n",
    "# -> Connecting to existing Ray cluster at address: 172.31.47.38:6379\n",
    "# -> waiting for nodes to start up: 1/87\n",
    "# -> ...\n",
    "# -> waiting for nodes to start up: 87/87\n",
    "# -> Stage 0:   0%|          | 0/5 [00:00<?, ?it/s]\n",
    "# -> Stage 0:  20%|██        | 1/5 [00:00<00:02,  1.77it/s]\n",
    "# -> Stage 0:  40%|████      | 2/5 [00:38<00:35, 11.67s/it]\n",
    "# -> Stage 0:  60%|██████    | 3/5 [01:13<00:37, 18.83s/it]\n",
    "# -> ...\n",
    "# -> (TrainingWorker pid=5084, ip=172.31.35.245) Training... worker: 12, epoch: 0\n",
    "# -> Stage 0:  80%|████████  | 4/5 [03:15<00:49, 49.63s/it]\n",
    "# -> ...\n",
    "# -> (TrainingWorker pid=5076, ip=172.31.40.190) Training... worker: 9, epoch: 1\n",
    "# -> Stage 0: 100%|██████████| 5/5 [05:02<00:00, 67.01s/it]\n",
    "# -> ...\n",
    "# -> (TrainingWorker pid=5074, ip=172.31.40.190) Training... worker: 0, epoch: 4\n",
    "# -> total ingestion time: 291s\n",
    "# -> throughput: 8.56GiB/s\n",
    "```"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
